package com.carol.bigdata.task.feature.cag

import com.carol.bigdata.Config
import com.carol.bigdata.constant.KVConstant
import com.carol.bigdata.utils.HBaseUtil
import com.carol.bigdata.utils.{Flag, TimeUtil}
import com.carol.bigdata.Config.hbaseParams
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import com.carol.bigdata.utils.FeatureUtil.{calFeat, calTotal, changeProfileRDD, getDataRowKey}
import com.carol.bigdata.utils.RddReader.readHiveRDD


object calSocialTag {

    // 基本字段
    val keyColumns: List[String] = KVConstant.keyColumns
    val keyLen: Int = keyColumns.length
    val cf: String = KVConstant.cf
    val timeField: String = KVConstant.timeField
    val propertiesField: String = KVConstant.propertiesField

    // 数据源
    // 送礼互动数据表
    val giftTable: String = KVConstant.actionTable
    val giftColumns: List[String] = keyColumns ++ List(timeField)
    val giftPattern: String = "gift_send"
    val giftTimeIndex: Int = giftColumns.indexOf(timeField)

    // 聊天互动数据表
    val chatTable: String = KVConstant.actionTable
    val chatColumns: List[String] = keyColumns :+ propertiesField
    val chatPattern: String = "cag_chat"
    val chatPropIdx: Int = chatColumns.indexOf(propertiesField)
    val chatPropColumns: List[String] = List("chat_type")

    // 游戏日更中间表
    // 用户数据统计结果表
    val userProfileTable: String = KVConstant.userProfileTable
    val keyCF: String = KVConstant.keyCF
    val keyCols: List[String] = KVConstant.dateKeyColumns
    val writeKeyCols: List[String] = KVConstant.writeKeyCols
    val event: String = "profile"

    // 用户社交特征字段 social_tag
    val socialMiddleCf: String = "social_tag"
    val giftFeat: List[(String, String)] = List(
        ("$gift_send_time", "map"),
        ("$gift_send_count", "int")
    )
    val chatFeat: List[(String, String)] = List(
        ("$chat_type", "map"),
        ("$chat_count", "int")
    )


    def calSocialInfo(statDay: String,
                      interactUsers: RDD[List[String]],
                      featList: List[(String, String)]): RDD[(List[String], List[String])] = {
        //println(s"interactUsers: ${interactUsers.count()}")
        //interactUsers.take(5).foreach(println)

        val actionRDD = interactUsers
            .map(x => {
                val (key, info) = x.splitAt(keyLen)
                (key, info.map(List(_)))
            })
            .reduceByKey((a, b) => a.zip(b).map(i => i._1 ++ i._2))
        val socialInfoRDD = actionRDD.map(x => {
            val res = calFeat(x._2, featList)
            (statDay +: x._1, res)
        })
        println(s"socialInfoRDD: ${socialInfoRDD.count()}")
        println(s"feat: ${featList}")
        socialInfoRDD.take(5).foreach(println)
        socialInfoRDD
    }

    def run(hbaseParams: Map[String, String],
            spark: SparkSession,
            statDay: String,
            game: String): Unit = {
        println("计算社交特征")
        val yesterday: String = TimeUtil.getTimeDeltaDay(statDay, -1)
        val yesterdayRowKey = for (date <- List(yesterday)) yield getDataRowKey(game, event,  date)
        // 1、读取hbase数据
        // List(gameId,accountId,statDay) => List(gameId,accountId,hour,1)
        //val giftSendChooseRowKey = for (date <- List(statDay)) yield getDataRowKey(game, giftPattern,  date)
        //val giftSendUsers = HBaseUtil.readAsList(hbaseParams, spark, giftTable, cf, giftColumns, giftSendChooseRowKey,
        //    timeColumn = "time", timeMode = "hour")
        //    .map(x => (for (i <- x.indices) yield {if (i!=giftTimeIndex) x(i) else x(i).takeRight(2)}).toList :+ "1")
        val giftSendUsers = readHiveRDD(spark, giftTable, statDay, giftPattern,  game, keyColumns :+ "hour", List())
            .map(x => (for (i <- x.indices) yield {
                if (i != giftTimeIndex) x(i) else x(i).takeRight(2)
            }).toList :+ "1")
        println("giftSendUsers: ", giftSendUsers.count())
        giftSendUsers.take(5).foreach(println)
        // List(gameId,accountId,properties) => List(gameId,accountId,chat_type,1)
        //val chatChooseRowKey = for (date <- List(statDay)) yield getDataRowKey(game, chatPattern,  date)
        //val chatUsers = HBaseUtil.readAsList(hbaseParams, spark, chatTable, cf, chatColumns, chatChooseRowKey,
        //    timeColumn = "time", timeMode = "day")
        //    .map(parseProperties(_, chatPropIdx, chatPropColumns) :+ "1")
        val chatUsers = readHiveRDD(spark, chatTable, statDay, chatPattern,  game, keyColumns, chatPropColumns)
            .map(_ :+ "1")
        println("chatUsers: ", chatUsers.count())
        chatUsers.take(5).foreach(println)

        // 2、计算
        val giftSocialRDD = calSocialInfo(statDay, giftSendUsers, giftFeat)
        val chatSocialRDD = calSocialInfo(statDay, chatUsers, chatFeat)
        val writeGiftSocialRDD = changeProfileRDD(giftSocialRDD)
        val writeChatSocialRDD = changeProfileRDD(chatSocialRDD)
        System.exit(111)

        // 3、写入hbase中间表
        val giftFeatColumns = giftFeat.map(_._1)
        val chatFeatColumns = chatFeat.map(_._1)

        //println("writeGiftSocialRDD",writeGiftSocialRDD.count())
        //writeGiftSocialRDD.take(20).foreach(println)
        //println("writeChatSocialRDD",writeChatSocialRDD.count())
        //writeChatSocialRDD.take(20).foreach(println)

        HBaseUtil.writeRDD2KVColumn(hbaseParams, userProfileTable, writeGiftSocialRDD, keyCF, writeKeyCols, giftFeatColumns,
            List.fill(giftFeatColumns.length)(socialMiddleCf))
        HBaseUtil.writeRDD2KVColumn(hbaseParams, userProfileTable, writeChatSocialRDD, keyCF, writeKeyCols, chatFeatColumns,
            List.fill(chatFeatColumns.length)(socialMiddleCf))

        // 4、计算total
        val totalGiftFeat = giftFeat.map(x => (x._1 + "_total", x._2))
        val totalChatFeat = chatFeat.map(x => (x._1 + "_total", x._2))
        val List(giftTotalIntRDD, giftTotalMapRDD) = calTotal(hbaseParams, spark, statDay, giftSocialRDD,
            userProfileTable, List(keyCF, socialMiddleCf), keyCols, totalGiftFeat, yesterdayRowKey)
        val List(chatTotalIntRDD, chatTotalMapRDD) = calTotal(hbaseParams, spark, statDay, chatSocialRDD,
            userProfileTable, List(keyCF, socialMiddleCf), keyCols, totalChatFeat, yesterdayRowKey)

        val rddLists = List(giftTotalIntRDD, giftTotalMapRDD, chatTotalIntRDD, chatTotalMapRDD)
        val rddList = for (rdd <- rddLists) yield changeProfileRDD(rdd)
        //println("rddList",rddList.map(_.count()))
        //rddList.foreach(_.take(5).foreach(println))

        // 5、写入total
        val giftTotalIntColumns = totalGiftFeat.filter(_._2.toLowerCase == "int").map(_._1)
        val giftTotalMapColumns = totalGiftFeat.filter(_._2.toLowerCase.contains("map")).map(_._1)
        val chatTotalIntColumns = totalChatFeat.filter(_._2.toLowerCase == "int").map(_._1)
        val chatTotalMapColumns = totalChatFeat.filter(_._2.toLowerCase.contains("map")).map(_._1)
        val colsList = List(giftTotalIntColumns, giftTotalMapColumns, chatTotalIntColumns, chatTotalMapColumns)
        for ((rdd, cols) <- rddList.zip(colsList))
            HBaseUtil.writeRDD2KVColumn(hbaseParams, userProfileTable, rdd, keyCF, writeKeyCols, cols,
                List.fill(cols.length)(socialMiddleCf))
    }


    def main(args: Array[String]): Unit = {
        Flag.Parse(args)
        val spark = SparkSession.builder()
            .config(conf = Config.sparkConf)
            .enableHiveSupport()
            .getOrCreate()
        val sc = spark.sparkContext
        sc.setLogLevel("ERROR")
        //val ipPath: String = sc.broadcast(Config.ipPath).value
        val gameList = List("200")
        val timezoneList = List("Asia/Bangkok")
        for (game <- gameList) {
            for (timezone <- timezoneList) {
                for (day <- 1 to 1) {
                    val statday = "2021-08-%02d".format(day)
                    TimeUtil.timer(run(hbaseParams, spark, statday, game))
                }
            }
        }

        spark.stop()
    }
}
